package pw.avvero.test.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;

import java.util.HashMap;
import java.util.Map;

public class RecordSnapshotMapper {

    /**
     * Converts a {@link ConsumerRecord} to a {@link RecordSnapshot} while filtering
     * headers based on a provided map of bound headers. This method is designed to
     * extract and use only the relevant headers that are present in both the original
     * Kafka record and the provided map of bound headers. The bound headers map is
     * typically generated by Spring and may contain additional metadata or system-related
     * headers that are not part of the original record's headers.
     *
     * @param record The Kafka {@link ConsumerRecord} to be converted.
     * @param boundedHeaders A map of headers provided by Spring that includes both
     *                       user-defined headers and potentially extraneous system headers.
     *                       unrelated or system headers.
     * @return A {@link RecordSnapshot} instance containing the filtered headers, key,
     *         and value from the original Kafka record.
     */
    public RecordSnapshot recordToSnapshot(ConsumerRecord<Object, Object> record, Map<String, Object> boundedHeaders) {
        Map<String, Object> headers = new HashMap<>();
        for (Header recordHeader : record.headers()) {
            headers.put(recordHeader.key(), boundedHeaders.get(recordHeader.key()));
        }
        return RecordSnapshot.builder()
                .topic(record.topic())
                .key(record.key() != null ? record.key() : "") // TODO no key?
                .headers(headers)
                .value(record.value())
                .build();
    }
}
